[AWS IoT Core] retained messages(保持メッセージ)を AWS IoT Device SDK v2 for Python で試してみました。
1 はじめに
IoT事業部の平内(SIN)です
AWS IoT Coreで、retained messages(保持メッセージ)のサポートがアナウンスされていました。
AWS IoT Core now supports MQTT retained messages
今回は、この動作をAWS IoT Device SDK v2 for Pythonで確認してみました。
2 retained messages
MQTTプロトコルフォーマットのフラグでは、PublishでRETAINというフラグがあります。
http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718022
このフラグを設定されたメッセージは、永続的(AWS IoT Coreでは、最後に更新されてから3年後に自動的に消滅)に保持され、後からサブスクライブしても、その内容を取得する事ができます。
メッセージは、トピックに紐づいており、同じトピック名で新たにメッセージを送信すると上書きされます。
なお、既にサブスクライブしているクライアントには、通常のメッセージと同様に、直ちに送信されます。
この機能を利用すると、デバイスの初期設定や設定変更をパブリッシュする場面では、デバイスが、オンラインかどうかを追跡する必要が無くなります。
3 クライアント
サブスクライブ側もパブリッシュ側も、分かりやすいようにQoS=0(QoS.AT_MOST_ONCE)で書きました。
(1) サブスクライブ側
通常の実装と変わりません。
受信時のコールバックであるon_message_received()では、retainという変数を受け取る事ができ、通常のメッセージであれば、ここがFalse、永続メッセージであれば、Trueとなります。
import time from awscrt import io, mqtt from awsiot import mqtt_connection_builder endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com" root_ca = "./certs/root-CA.crt" cert = "./certs/xxxxxxxxxxxx-certificate.pem.crt" key = "./certs/xxxxxxxxxxxx-private.pem.key" topic = "retain-topic" client_id = "subscriber" def on_message_received(topic, payload, dup, qos, retain, **kwargs): print("received topic: {}".format(topic)) print("payload: {}".format(payload.decode())) print("retain: {}".format(retain)) event_loop_group = io.EventLoopGroup(1) host_resolver = io.DefaultHostResolver(event_loop_group) client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver) client = mqtt_connection_builder.mtls_from_path( endpoint = endpoint, cert_filepath = cert, pri_key_filepath = key, client_bootstrap = client_bootstrap, ca_filepath = root_ca, client_id = client_id, clean_session = False, keep_alive_secs = 6) connect_future = client.connect() connect_future.result() print("Connected! {}".format(client_id)) subscribe_future, _ = client.subscribe( topic = topic, qos = mqtt.QoS.AT_MOST_ONCE, callback = on_message_received) subscribe_future.result() while True: time.sleep(1) disconnect_future = client.disconnect() disconnect_future.result()
(2) パブリッシュ側
Publishでは、retainフラグをTrueとしています。
参考:https://awslabs.github.io/aws-crt-python/api/mqtt.html#awscrt.mqtt.Connection
import json import datetime from awscrt import io, mqtt from awsiot import mqtt_connection_builder endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com" root_ca = "./certs/root-CA.crt" cert = "./certs/xxxxxxxxxxxx-certificate.pem.crt" key = "./certs/xxxxxxxxxxxx-private.pem.key" topic = "retain-topic" client_id = "publiser" def on_message_received(topic, payload, **kwargs): print("received: {}".format(payload.decode())) event_loop_group = io.EventLoopGroup(1) host_resolver = io.DefaultHostResolver(event_loop_group) client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver) client = mqtt_connection_builder.mtls_from_path( endpoint = endpoint, cert_filepath = cert, pri_key_filepath = key, client_bootstrap = client_bootstrap, ca_filepath = root_ca, client_id = client_id, clean_session = False, keep_alive_secs = 6) connect_future = client.connect() connect_future.result() print("Connected! {}".format(client_id)) # Publih payload = { "timestamp": datetime.datetime.now().isoformat(sep=' ', timespec='milliseconds'), "status": 0 } client.publish( topic=topic, payload=json.dumps(payload), qos=mqtt.QoS.AT_MOST_ONCE, retain=True) disconnect_future = client.disconnect() disconnect_future.result()
4 削除
0バイトのペイロードで、メッセージを送信すると、保持メッセージは削除されます。
client.publish( topic=topic, payload="", qos=mqtt.QoS.AT_MOST_ONCE, retain=True)
5 iot:RetainPublish
RetainフラグをTrueにしたメッセージをPublishするには、Actionでiot:RetainPublishがAllowになっている必要があります。
例)
{ "Effect": "Allow", "Action": [ "iot:Publish", "iot:RetainPublish" ], "Resource": [ "arn:aws:iot:ap-northeast-1:xxxxxxxx:topic/retain-topic" ] }
6 AWS IoT コンソール
保持れたメッセージは、IoT Coreのコンソールから確認できます。
また、新たに発行(更新)したり、削除することも可能です。
7 動作確認
既にメッセージが保持されている場合に、サブスクライブすると、保持されたメッセージを受け取ります。
% python3 subscribe.py Connected! subscriber received topic: retain-topic payload: {"timestamp": "2021-08-25 08:25:02.507", "status": 2} retain: True
サブスクライブ中に、Retainフラグがセットされたメッセージが到着すると、通常のメッセージとして受信します。(retain: False)
received topic: retain-topic payload: {"timestamp": "2021-08-25 08:26:22.014", "status": 3} retain: False
サブスクライブ中に、Retainフラグがセットされ、ペーロードが0バイトのメッセージが到着しても、メッセージは受信しません。
8 最後に
retained messagesの特性をしっかり把握する事で、シャドウとはまた違った、便利な使い方ができそう気がしています。